package testworkflows

import (
	"bufio"
	"bytes"
	"context"
	"errors"
	"fmt"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/avast/retry-go/v4"
	"github.com/spf13/cobra"

	"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common"
	"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common/render"
	"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/testworkflows/renderer"
	testkubecfg "github.com/kubeshop/testkube/cmd/kubectl-testkube/config"
	"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
	common2 "github.com/kubeshop/testkube/internal/common"
	apiclientv1 "github.com/kubeshop/testkube/pkg/api/v1/client"
	"github.com/kubeshop/testkube/pkg/api/v1/testkube"
	tclcmd "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/cmd"
	"github.com/kubeshop/testkube/pkg/telemetry"
	"github.com/kubeshop/testkube/pkg/testworkflows"
	"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/registry"
	"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
	"github.com/kubeshop/testkube/pkg/ui"
)

const (
	apiErrorMessage = "processing error:"
	logsCheckDelay  = 100 * time.Millisecond

	logsRetryAttempts = 10
	logsRetryDelay    = time.Second

	// Iteration delay thresholds and values
	initialIterationThreshold = 5
	normalIterationThreshold  = 100
	initialIterationDelay     = 500 * time.Millisecond
	normalIterationDelay      = 1 * time.Second
	slowIterationDelay        = 5 * time.Second

	// Timestamp detection constants
	timestampTPosition = 10
)

var (
	NL = []byte("\n")
)

// executionError represents an error during test workflow execution
type executionError struct {
	Operation   string
	ExecutionID string
	Cause       error
}

func (e executionError) Error() string {
	if e.ExecutionID != "" {
		return fmt.Sprintf("%s for execution %s: %v", e.Operation, e.ExecutionID, e.Cause)
	}
	return fmt.Sprintf("%s: %v", e.Operation, e.Cause)
}

func (e executionError) Unwrap() error {
	return e.Cause
}

// RunOptions contains all configuration for running test workflows
type RunOptions struct {
	ExecutionName            string
	Config                   map[string]string
	Variables                []string
	WatchEnabled             bool
	Silent                   bool
	DisableWebhooks          bool
	DownloadArtifactsEnabled bool
	DownloadDir              string
	Format                   string
	Masks                    []string
	Tags                     map[string]string
	Selectors                []string
	ServiceName              string
	ParallelStepName         string
	ServiceIndex             int
	ParallelStepIndex        int
	TargetMatch              []string
	TargetNot                []string
	TargetReplicate          []string
}

// ExecutionOptions contains options for processing executions
type ExecutionOptions struct {
	WatchEnabled             bool
	ServiceName              string
	ServiceIndex             int
	ParallelStepName         string
	ParallelStepIndex        int
	DownloadArtifactsEnabled bool
	DownloadDir              string
	Format                   string
	Masks                    []string
}

// WatchOptions contains options for watching execution logs
type WatchOptions struct {
	ServiceName       string
	ServiceIndex      int
	ParallelStepName  string
	ParallelStepIndex int
	DownloadOptions   DownloadOptions
}

// DownloadOptions contains artifact download configuration
type DownloadOptions struct {
	Enabled bool
	Dir     string
	Format  string
	Masks   []string
}

// TargetOptions contains execution target configuration
type TargetOptions struct {
	Match     []string
	Not       []string
	Replicate []string
}

// NewRunTestWorkflowCmd creates cobra command for running test workflows
func NewRunTestWorkflowCmd() *cobra.Command {
	opts := &RunOptions{
		DownloadDir: "artifacts",
		Format:      "folder",
	}

	cmd := &cobra.Command{
		Use:     "testworkflow [name]",
		Aliases: []string{"testworkflows", "tw"},
		Short:   "Starts test workflow execution",

		Run: runTestWorkflow(opts),
	}

	cmd.Flags().StringVarP(&opts.ExecutionName, "name", "n", "", "execution name, if empty will be autogenerated")
	cmd.Flags().StringToStringVarP(&opts.Config, "config", "", map[string]string{}, "configuration variables in a form of name1=val1 passed to executor")
	cmd.Flags().StringArrayVarP(&opts.Variables, "variable", "v", []string{}, "execution variable passed to executor")
	cmd.Flags().BoolVarP(&opts.WatchEnabled, "watch", "f", false, "watch for changes after start")

	cmd.Flags().BoolVar(&opts.Silent, "silent", false, "run test workflow silently (disables webhooks, insights, health, metrics, cdevents)")
	cmd.Flags().BoolVar(&opts.DisableWebhooks, "disable-webhooks", false, "disable webhooks for this execution (deprecated: use --silent)")
	cmd.Flags().MarkDeprecated("disable-webhooks", "use --silent flag instead")
	cmd.Flags().MarkDeprecated("enable-webhooks", "enable-webhooks is deprecated")
	cmd.Flags().StringVar(&opts.DownloadDir, "download-dir", opts.DownloadDir, "download dir")
	cmd.Flags().BoolVarP(&opts.DownloadArtifactsEnabled, "download-artifacts", "d", false, "download artifacts automatically")
	cmd.Flags().StringVar(&opts.Format, "format", opts.Format, "data format for storing files, one of folder|archive")
	cmd.Flags().StringArrayVarP(&opts.Masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$")
	cmd.Flags().StringToStringVarP(&opts.Tags, "tag", "", map[string]string{}, "execution tag adds a tag to execution in form of name1=val1 passed to executor")
	cmd.Flags().StringSliceVarP(&opts.Selectors, "label", "l", nil, "label is used to select test workflows to run using key value pair: --label key1=value1 or label expression")
	cmd.Flags().StringVar(&opts.ServiceName, "service-name", "", "test workflow service name")
	cmd.Flags().IntVar(&opts.ServiceIndex, "service-index", 0, "test workflow service index starting from 0")
	cmd.Flags().StringVar(&opts.ParallelStepName, "parallel-step-name", "", "test workflow parallel step name or reference")
	cmd.Flags().IntVar(&opts.ParallelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0")
	cmd.Flags().StringArrayVar(&opts.TargetMatch, "target", nil, "runner labels to match")
	cmd.Flags().StringArrayVar(&opts.TargetNot, "target-not", nil, "runner labels to not match")
	cmd.Flags().StringArrayVar(&opts.TargetReplicate, "target-replicate", nil, "runner labels to replicate over")

	return cmd
}

// runTestWorkflow returns the cobra run function that orchestrates workflow execution
func runTestWorkflow(opts *RunOptions) func(*cobra.Command, []string) {
	return func(cmd *cobra.Command, args []string) {
		// Create cancelable context that exits cleanly on interrupt
		ctx, cancel := context.WithCancel(cmd.Context())
		go func() {
			<-ctx.Done()
			if errors.Is(ctx.Err(), context.Canceled) {
				cancel()
				os.Exit(0)
			}
		}()

		outputPretty, namespace, client := initializeCommand(cmd)

		runContext, interfaceType := getRunContext()
		cfg, cliErr := loadConfig()
		common.HandleCLIError(cliErr)

		targetOpts := TargetOptions{
			Match:     opts.TargetMatch,
			Not:       opts.TargetNot,
			Replicate: opts.TargetReplicate,
		}

		variables, cliErr := parseVariables(opts.Variables)
		common.HandleCLIError(cliErr)

		var silentMode *testkube.SilentMode
		if opts.Silent {
			silentMode = &testkube.SilentMode{
				Webhooks: true,
				Insights: true,
				Health:   true,
				Metrics:  true,
				Cdevents: true,
			}
		} else if opts.DisableWebhooks {
			silentMode = &testkube.SilentMode{
				Webhooks: true,
			}
		}

		request, cliErr := buildExecutionRequest(cfg, runContext, interfaceType, opts.ExecutionName, opts.Config,
			variables, silentMode, opts.Tags, targetOpts)
		common.HandleCLIError(cliErr)

		executions, err := executeWorkflows(client, args, opts.Selectors, request)
		processExecutionError(err, args, opts.Selectors, namespace)

		execOpts := ExecutionOptions{
			WatchEnabled:             opts.WatchEnabled,
			ServiceName:              opts.ServiceName,
			ServiceIndex:             opts.ServiceIndex,
			ParallelStepName:         opts.ParallelStepName,
			ParallelStepIndex:        opts.ParallelStepIndex,
			DownloadArtifactsEnabled: opts.DownloadArtifactsEnabled,
			DownloadDir:              opts.DownloadDir,
			Format:                   opts.Format,
			Masks:                    opts.Masks,
		}
		exitCode, cliErr := processExecutions(cmd, executions, client, outputPretty, execOpts, args)
		common.HandleCLIError(cliErr)

		if exitCode != 0 {
			os.Exit(exitCode)
		}
	}
}

// initializeCommand extracts output preferences and initializes the API client
func initializeCommand(cmd *cobra.Command) (bool, string, apiclientv1.Client) {
	outputFlag := cmd.Flag("output")
	outputType := render.OutputPretty
	if outputFlag != nil {
		outputType = render.OutputType(outputFlag.Value.String())
	}

	outputPretty := outputType == render.OutputPretty
	namespace := cmd.Flag("namespace").Value.String()
	client, _, err := common.GetClient(cmd)
	ui.ExitOnError("getting client", err)

	return outputPretty, namespace, client
}

// getRunContext determines if running in CI/CD or CLI mode
func getRunContext() (string, testkube.TestWorkflowRunningContextInterfaceType) {
	runContext := telemetry.GetCliRunContext()
	interfaceType := testkube.CICD_TestWorkflowRunningContextInterfaceType
	if runContext == "others|local" {
		runContext = ""
		interfaceType = testkube.CLI_TestWorkflowRunningContextInterfaceType
	}
	return runContext, interfaceType
}

// loadConfig loads testkube config from disk
func loadConfig() (testkubecfg.Data, *common.CLIError) {
	cfg, err := testkubecfg.Load()
	if err != nil {
		return testkubecfg.Data{}, common.NewCLIError(
			common.TKErrConfigInitFailed,
			"Error Loading Testkube Config",
			"Check that the Testkube config file (~/.testkube/config.json) exists and has correct permissions.",
			err,
		)
	}
	ui.NL()
	return cfg, nil
}

// parseVariables converts variable array to map and validates format
func parseVariables(variables []string) (map[string]string, *common.CLIError) {
	vars := make(map[string]string)
	for _, v := range variables {
		parts := strings.SplitN(v, "=", 2)
		if len(parts) != 2 {
			return nil, common.NewCLIError(
				common.TKErrInvalidRuntimeParameter,
				"Invalid Variable Format",
				fmt.Sprintf("Variable '%s' must be in KEY=value format. Use --variable KEY=value (example: --variable API_KEY=secret123)", v),
				fmt.Errorf("invalid variable format '%s'", v),
			)
		}
		if parts[0] == "" {
			return nil, common.NewCLIError(
				common.TKErrInvalidRuntimeParameter,
				"Empty Variable Key",
				fmt.Sprintf("Variable key cannot be empty in '%s'. Use --variable KEY=value (example: --variable API_KEY=secret123)", v),
				fmt.Errorf("empty variable key in '%s'", v),
			)
		}
		vars[parts[0]] = parts[1]
	}
	return vars, nil
}

// buildExecutionRequest builds API request from command flags
func buildExecutionRequest(
	cfg testkubecfg.Data,
	runContext string,
	interfaceType testkube.TestWorkflowRunningContextInterfaceType,
	executionName string,
	config map[string]string,
	variables map[string]string,
	silentMode *testkube.SilentMode,
	tags map[string]string,
	targetOpts TargetOptions,
) (testkube.TestWorkflowExecutionRequest, *common.CLIError) {

	var runningContext *testkube.TestWorkflowRunningContext
	if cfg.ContextType == testkubecfg.ContextTypeCloud {
		runningContext = tclcmd.GetRunningContext(runContext, cfg.CloudContext.ApiKey, interfaceType)
	}

	request := testkube.TestWorkflowExecutionRequest{
		Name:           executionName,
		Config:         config,
		SilentMode:     silentMode,
		Tags:           tags,
		RunningContext: runningContext,
		Target:         &testkube.ExecutionTarget{},
	}

	// Backward compatibility: set DisableWebhooks if silent webhooks is enabled
	if silentMode != nil && silentMode.Webhooks {
		request.DisableWebhooks = true
	}

	if len(variables) > 0 {
		request.Runtime = &testkube.TestWorkflowExecutionRuntime{
			Variables: variables,
		}
	}

	if len(targetOpts.Match) > 0 {
		match, err := parseTargetMap(targetOpts.Match)
		if err != nil {
			return testkube.TestWorkflowExecutionRequest{}, err
		}
		request.Target.Match = match
	}
	if len(targetOpts.Not) > 0 {
		not, err := parseTargetMap(targetOpts.Not)
		if err != nil {
			return testkube.TestWorkflowExecutionRequest{}, err
		}
		request.Target.Not = not
	}
	if len(targetOpts.Replicate) > 0 {
		request.Target.Replicate = common2.MapSlice(strings.Split(strings.Join(targetOpts.Replicate, ","), ","), strings.TrimSpace)
	}

	return request, nil
}

// parseTargetMap parses key=value1,value2 strings into map
func parseTargetMap(targets []string) (map[string][]string, *common.CLIError) {
	result := make(map[string][]string)
	for _, target := range targets {
		key, values, _ := strings.Cut(target, "=")
		if key == "" {
			return nil, common.NewCLIError(
				common.TKErrInvalidRuntimeParameter,
				"Invalid Target Format",
				fmt.Sprintf("Target key cannot be empty in '%s'. Use --target-match key=value1,value2 (example: --target-match env=staging,prod)", target),
				fmt.Errorf("empty target key in '%s'", target),
			)
		}
		result[key] = common2.MapSlice(strings.Split(values, ","), strings.TrimSpace)
	}
	return result, nil
}

// executeWorkflows runs single workflow by name or multiple by label selector
func executeWorkflows(client apiclientv1.Client, args []string, selectors []string,
	request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) {

	var executions []testkube.TestWorkflowExecution
	var err error

	switch {
	case len(args) > 0:
		name := args[0]
		var execution testkube.TestWorkflowExecution
		execution, err = client.ExecuteTestWorkflow(name, request)
		executions = append(executions, execution)
	case len(selectors) != 0:
		selector := strings.Join(selectors, ",")
		executions, err = client.ExecuteTestWorkflows(selector, request)
	default:
		ui.Failf("Pass Test workflow name or labels to run by labels ")
	}

	return executions, err
}

// processExecutionError reformats open-source edition errors to be user-friendly
func processExecutionError(err error, args []string, selectors []string, namespace string) {
	if err == nil {
		return
	}

	// User friendly Open Source operation error
	errMessage := err.Error()
	if strings.Contains(errMessage, constants.OpenSourceOperationErrorMessage) {
		startp := strings.LastIndex(errMessage, apiErrorMessage)
		endp := strings.Index(errMessage, constants.OpenSourceOperationErrorMessage)
		if startp != -1 && endp != -1 {
			startp += len(apiErrorMessage)
			operation := ""
			if endp > startp {
				operation = strings.TrimSpace(errMessage[startp:endp])
			}
			err = errors.New(operation + " " + constants.OpenSourceOperationErrorMessage)
		}
	}

	if len(args) > 0 {
		ui.ExitOnError("execute test workflow "+args[0]+" from namespace "+namespace, err)
	} else {
		ui.ExitOnError("execute test workflows "+strings.Join(selectors, ",")+" from namespace "+namespace, err)
	}
}

// processExecutions runs all workflow executions and returns combined exit code and error
func processExecutions(
	cmd *cobra.Command,
	executions []testkube.TestWorkflowExecution,
	client apiclientv1.Client,
	outputPretty bool,
	opts ExecutionOptions,
	args []string,
) (int, *common.CLIError) {

	var wg sync.WaitGroup
	var mu sync.Mutex
	var exitCode = 0

	for _, execution := range executions {
		err := renderer.PrintTestWorkflowExecution(cmd, os.Stdout, execution)
		if err != nil {
			return 0, common.NewCLIError(
				common.TKErrInvalidRuntimeParameter,
				"Failed to Render Execution",
				"Check that the execution data is valid and the output format is supported",
				err,
			)
		}

		if outputPretty {
			ui.NL()
			if !execution.FailedToInitialize() {
				if opts.WatchEnabled {
					watchOpts := WatchOptions{
						ServiceName:       opts.ServiceName,
						ServiceIndex:      opts.ServiceIndex,
						ParallelStepName:  opts.ParallelStepName,
						ParallelStepIndex: opts.ParallelStepIndex,
						DownloadOptions: DownloadOptions{
							Enabled: opts.DownloadArtifactsEnabled,
							Dir:     opts.DownloadDir,
							Format:  opts.Format,
							Masks:   opts.Masks,
						},
					}
					exitCode = handleWatchMode(cmd, args, &execution, &wg, &mu, &exitCode,
						client, watchOpts, outputPretty)
				} else {
					uiShellWatchExecution(execution.Id)
				}
			}

			executionId := execution.Id
			execution, err = client.GetTestWorkflowExecution(executionId)
			if err != nil {
				return 0, common.NewCLIError(
					common.TKErrInvalidRuntimeParameter,
					"Failed to Get Execution Status",
					fmt.Sprintf("Could not retrieve execution status for '%s'. Check that the execution exists and Testkube API is accessible", executionId),
					err,
				)
			}

			render.PrintTestWorkflowExecutionURIs(&execution)
			uiShellGetExecution(executionId)
		}
	}

	wg.Wait()
	return exitCode, nil
}

// handleWatchMode streams logs and waits for workflow completion
func handleWatchMode(
	cmd *cobra.Command,
	args []string,
	execution *testkube.TestWorkflowExecution,
	wg *sync.WaitGroup,
	mu *sync.Mutex,
	exitCode *int,
	client apiclientv1.Client,
	opts WatchOptions,
	outputPretty bool,
) int {

	var pServiceName, pParallelStepName *string
	if cmd.Flag("service-name").Changed || cmd.Flag("service-index").Changed {
		pServiceName = &opts.ServiceName
	}
	if cmd.Flag("parallel-step-name").Changed || cmd.Flag("parallel-step-index").Changed {
		pParallelStepName = &opts.ParallelStepName
	}

	if len(args) > 0 {
		ec := uiWatch(*execution, pServiceName, opts.ServiceIndex, pParallelStepName, opts.ParallelStepIndex, client)
		ui.NL()
		if opts.DownloadOptions.Enabled {
			common.DownloadTestWorkflowArtifacts(execution.Id, opts.DownloadOptions.Dir,
				opts.DownloadOptions.Format, opts.DownloadOptions.Masks, client, outputPretty)
		}
		return ec
	} else {
		wg.Add(1)
		go func(exec *testkube.TestWorkflowExecution) {
			defer wg.Done()

			prefix := ""
			if exec.Workflow != nil {
				prefix = fmt.Sprintf("[%s] ", exec.Workflow.Name)
			}

			options := []Options{{Prefix: prefix}}
			ec := uiWatch(*exec, pServiceName, opts.ServiceIndex, pParallelStepName, opts.ParallelStepIndex, client, options...)
			ui.NL()
			if opts.DownloadOptions.Enabled {
				common.DownloadTestWorkflowArtifacts(exec.Id, opts.DownloadOptions.Dir,
					opts.DownloadOptions.Format, opts.DownloadOptions.Masks, client, outputPretty)
			}

			if ec != 0 {
				mu.Lock()
				*exitCode |= ec
				mu.Unlock()
			}
		}(execution)
		return 0
	}
}

// getIterationDelay returns progressively longer delays for polling retries
func getIterationDelay(iteration int) time.Duration {
	switch {
	case iteration < initialIterationThreshold:
		return initialIterationDelay
	case iteration < normalIterationThreshold:
		return normalIterationDelay
	default:
		return slowIterationDelay
	}
}

type Options struct {
	Prefix string
}

// uiWatch waits for execution assignment, streams logs, and returns exit code
func uiWatch(
	execution testkube.TestWorkflowExecution,
	serviceName *string,
	serviceIndex int,
	parallelStepName *string,
	parallelStepIndex int,
	client apiclientv1.Client,
	options ...Options,
) int {
	prefix := ""
	for _, o := range options {
		if o.Prefix != "" {
			prefix = o.Prefix
		}
	}

	// Wait until the execution will be assigned to some runner
	iteration := 0
	executionId := execution.Id // Store ID before potential error
	for !execution.Assigned() {
		var err error
		iteration++
		time.Sleep(getIterationDelay(iteration))
		execution, err = client.GetTestWorkflowExecution(executionId)
		if err != nil {
			ui.Failf("failed to get execution %s: %v", executionId, err)
		}
	}

	// Print final logs in case execution is already finished
	if execution.Result.IsFinished() {
		ui.Info("Getting logs for test workflow execution", execution.Id)

		logs, err := client.GetTestWorkflowExecutionLogs(execution.Id)
		ui.ExitOnError("getting logs from executor", err)

		sigs := testworkflows.FlattenSignatures(execution.Signature)

		printRawLogLines(logs, sigs, execution, options...)
		if execution.Result.IsAnyError() {
			return 1
		}

		return 0
	}

	var result *testkube.TestWorkflowResult
	var err error

	switch {
	case serviceName != nil:
		found := false
		if execution.Workflow != nil {
			found = execution.Workflow.HasService(*serviceName)
		}

		if !found {
			ui.Failf("unknown service '%s' for test workflow execution %s", *serviceName, execution.Id)
		}

		result, err = watchTestWorkflowServiceLogs(execution.Id, prefix, *serviceName, serviceIndex, execution.Signature, client)
	case parallelStepName != nil:
		ref := execution.GetParallelStepReference(*parallelStepName)
		if ref == "" {
			ui.Failf("unknown parallel step '%s' for test workflow execution %s", *parallelStepName, execution.Id)
		}

		result, err = watchTestWorkflowParallelStepLogs(execution.Id, prefix, ref, parallelStepIndex, execution.Signature, client)
	default:
		result, err = watchTestWorkflowLogs(execution.Id, prefix, execution.Signature, client)
	}

	if result == nil && err == nil {
		err = executionError{
			Operation:   "get execution result",
			ExecutionID: executionId,
			Cause:       errors.New("no result found"),
		}
	}

	ui.ExitOnError("reading test workflow execution logs", err)

	// Apply the result in the execution
	execution.Result = result
	if result.IsFinished() {
		execution.StatusAt = result.FinishedAt
	}

	// Display message depending on the result
	switch {
	case result.Initialization.ErrorMessage != "":
		ui.Warn("test workflow execution failed:\n")
		ui.Errf("%s", result.Initialization.ErrorMessage)
		return 1
	case result.IsFailed():
		ui.Warn("test workflow execution failed")
		return 1
	case result.IsAborted():
		ui.Warn("test workflow execution aborted")
		return 1
	case result.IsPassed():
		ui.Success("test workflow execution completed with success in " + result.FinishedAt.Sub(result.QueuedAt).String())
	}
	return 0
}

// uiShellGetExecution prints kubectl command to get execution details
func uiShellGetExecution(id string) {
	ui.ShellCommand(
		"Use following command to get test workflow execution details",
		"kubectl testkube get twe "+id,
	)
}

// uiShellWatchExecution prints kubectl command to watch execution
func uiShellWatchExecution(id string) {
	ui.ShellCommand(
		"Watch test workflow execution until complete",
		"kubectl testkube watch twe "+id,
	)
}

// printSingleResultDifference prints status change between two step results, returns true if changed
func printSingleResultDifference(r1 testkube.TestWorkflowStepResult, r2 testkube.TestWorkflowStepResult, signature testkube.TestWorkflowSignature, index int, steps int, prefix string) bool {
	r1Status := testkube.QUEUED_TestWorkflowStepStatus
	r2Status := testkube.QUEUED_TestWorkflowStepStatus
	if r1.Status != nil {
		r1Status = *r1.Status
	}
	if r2.Status != nil {
		r2Status = *r2.Status
	}
	if r1Status == r2Status {
		return false
	}
	name := signature.Category
	if signature.Name != "" {
		name = signature.Name
	}
	took := r2.FinishedAt.Sub(r2.QueuedAt).Round(time.Millisecond)

	printStatus(signature, r2Status, took, index, steps, name, r2.ErrorMessage, prefix)
	return true
}

// printResultDifference prints all status changes between two workflow results
func printResultDifference(res1 *testkube.TestWorkflowResult, res2 *testkube.TestWorkflowResult, steps []testkube.TestWorkflowSignature, prefix string) bool {
	if res1 == nil || res2 == nil {
		return false
	}
	changed := printSingleResultDifference(*res1.Initialization, *res2.Initialization, testkube.TestWorkflowSignature{Name: "Initializing"}, -1, len(steps), prefix)
	for i, s := range steps {
		changed = changed || printSingleResultDifference(res1.Steps[s.Ref], res2.Steps[s.Ref], s, i, len(steps), prefix)
	}

	return changed
}

// printTestWorkflowLogs consumes notification stream and prints logs with step status updates
func printTestWorkflowLogs(signature []testkube.TestWorkflowSignature, notifications chan testkube.TestWorkflowExecutionNotification, prefix string) (result *testkube.TestWorkflowResult, err error) {
	steps := testworkflows.FlattenSignatures(signature)

	var isLineBeginning = true
	var isFirstLine = true
	for l := range notifications {
		if l.Output != nil {
			isFirstLine = false
			continue
		}
		if l.Result != nil {
			if printResultDifference(result, l.Result, steps, prefix) {
				isLineBeginning = true
			}
			result = l.Result
			isFirstLine = false
			continue
		}

		isLineBeginning, err = printStructuredLogLines(l.Log, isLineBeginning, isFirstLine, prefix)
		if err != nil {
			return nil, err
		}
		isFirstLine = false
	}

	ui.NL()
	return result, nil
}

// watchWorkflowLogsCommon retries fetching and printing logs until workflow finishes
func watchWorkflowLogsCommon(
	id, prefix, spinnerMessage string,
	signature []testkube.TestWorkflowSignature,
	client apiclientv1.Client,
	getNotifications func() (chan testkube.TestWorkflowExecutionNotification, error),
) (*testkube.TestWorkflowResult, error) {

	var (
		notifications chan testkube.TestWorkflowExecutionNotification
		result        *testkube.TestWorkflowResult
		nErr          error
	)

	spinner := ui.NewSpinner(spinnerMessage)
	for {
		notifications, nErr = getNotifications()
		if nErr != nil {
			execution, cErr := client.GetTestWorkflowExecution(id)
			if cErr != nil {
				spinner.Fail()
				return nil, cErr
			}

			if execution.Result != nil {
				if execution.Result.IsFinished() {
					nErr = executionError{
						Operation:   "watch logs",
						ExecutionID: id,
						Cause:       errors.New("execution already finished"),
					}
				} else {
					time.Sleep(logsCheckDelay)
					continue
				}
			}
		}

		if nErr != nil {
			spinner.Fail()
			return nil, nErr
		}

		spinner.Stop()
		result, nErr = printTestWorkflowLogs(signature, notifications, prefix)
		if nErr != nil {
			spinner.Warning("Retrying logs")
			ui.NL()
			continue
		}

		spinner.Success("Logs received")
		ui.NL()
		break
	}

	return result, nil
}

// watchTestWorkflowLogs streams main workflow execution logs with retry logic
func watchTestWorkflowLogs(id, prefix string, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (result *testkube.TestWorkflowResult, err error) {
	ui.Info("Getting logs from test workflow job", id)

	// retry logic in case of error or closed channel with running state
	err = retry.Do(
		func() error {
			notifications, err := client.GetTestWorkflowExecutionNotifications(id)
			if err != nil {
				return err
			}

			// Check if result stream is closed and if execution is finished
			result, err = printTestWorkflowLogs(signature, notifications, prefix)
			if err != nil {
				return err
			}

			if result != nil && result.Status != nil && !result.Status.Finished() {
				return fmt.Errorf("test workflow execution is not finished but channel is closed")
			}

			return nil
		},
		retry.Attempts(logsRetryAttempts),
		retry.Delay(logsRetryDelay),
		retry.LastErrorOnly(true),
	)

	return result, err
}

// watchTestWorkflowServiceLogs streams logs for a specific service instance
func watchTestWorkflowServiceLogs(id, prefix, serviceName string, serviceIndex int,
	signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
	ui.Info("Getting logs from test workflow service job", fmt.Sprintf("%s-%s-%d", id, serviceName, serviceIndex))

	getNotifications := func() (chan testkube.TestWorkflowExecutionNotification, error) {
		return client.GetTestWorkflowExecutionServiceNotifications(id, serviceName, serviceIndex)
	}

	return watchWorkflowLogsCommon(id, prefix, "Waiting for service logs", signature, client, getNotifications)
}

// watchTestWorkflowParallelStepLogs streams logs for a specific parallel worker
func watchTestWorkflowParallelStepLogs(id, prefix, ref string, workerIndex int,
	signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
	ui.Info("Getting logs from test workflow parallel step job", fmt.Sprintf("%s-%s-%d", id, ref, workerIndex))

	getNotifications := func() (chan testkube.TestWorkflowExecutionNotification, error) {
		return client.GetTestWorkflowExecutionParallelStepNotifications(id, ref, workerIndex)
	}

	return watchWorkflowLogsCommon(id, prefix, "Waiting for parallel step logs", signature, client, getNotifications)
}

// printStatusHeader prints step header with progress indicator
func printStatusHeader(i, n int, name, prefix string) {
	if i == -1 {
		fmt.Println("\n" + prefix + ui.LightCyan(fmt.Sprintf("• %s", name)))
	} else {
		fmt.Println("\n" + prefix + ui.LightCyan(fmt.Sprintf("• (%d/%d) %s", i+1, n, name)))
	}
}

// printStatus prints colored step status based on result
func printStatus(s testkube.TestWorkflowSignature, rStatus testkube.TestWorkflowStepStatus, took time.Duration,
	i, n int, name string, errorMessage, prefix string) {
	if len(errorMessage) > 0 {
		fmt.Printf("\n%s%s", prefix, ui.Red(errorMessage))
	}
	switch rStatus {
	case testkube.RUNNING_TestWorkflowStepStatus:
		printStatusHeader(i, n, name, prefix)
	case testkube.SKIPPED_TestWorkflowStepStatus:
		fmt.Println(prefix + ui.LightGray("• skipped"))
	case testkube.PASSED_TestWorkflowStepStatus:
		fmt.Println("\n" + prefix + ui.Green(fmt.Sprintf("• passed in %s", took)))
	case testkube.ABORTED_TestWorkflowStepStatus:
		fmt.Println("\n" + prefix + ui.Red("• aborted"))
	default:
		if s.Optional {
			fmt.Println("\n" + prefix + ui.Yellow(fmt.Sprintf("• %s in %s (ignored)", string(rStatus), took)))
		} else {
			fmt.Println("\n" + prefix + ui.Red(fmt.Sprintf("• %s in %s", string(rStatus), took)))
		}
	}
}

// trimTimestamp removes RFC timestamp prefix from log line if present
func trimTimestamp(line string) string {
	// RFC3339 format has 'T' at position 10: "2006-01-02T15:04:05"
	//                                          0123456789T
	hasTSeparatorAtExpectedPos := strings.Index(line, "T") == timestampTPosition
	if !hasTSeparatorAtExpectedPos {
		return line
	}

	// Find the space that separates timestamp from the actual log message
	spaceAfterTimestamp := strings.Index(line, " ")
	hasSpaceSeparator := spaceAfterTimestamp > 0
	hasContentAfterSpace := len(line) > spaceAfterTimestamp

	if hasSpaceSeparator && hasContentAfterSpace {
		logContent := line[spaceAfterTimestamp+1:]
		return logContent
	}

	return line
}

// printStructuredLogLines prints log lines with timestamps removed and prefix applied
func printStructuredLogLines(logs string, isLineBeginning, isFirstLine bool, prefix string) (bool, error) {
	if len(logs) == 0 {
		return isLineBeginning, nil
	}
	willBeLineBeginning := logs[len(logs)-1] == '\n'
	scanner := bufio.NewScanner(strings.NewReader(logs))
	next := false
	for scanner.Scan() {
		if next {
			fmt.Print("\n" + prefix)
		}
		text := trimTimestamp(scanner.Text())
		if isFirstLine && text == registry.ErrResourceNotFound.Error() {
			return isLineBeginning, registry.ErrResourceNotFound
		}
		fmt.Print(text)
		next = true
	}
	if isLineBeginning {
		fmt.Print("\n" + prefix)
	}
	return willBeLineBeginning, nil
}

// printRawLogLines parses raw logs to extract step transitions and print formatted output
func printRawLogLines(logs []byte, steps []testkube.TestWorkflowSignature, execution testkube.TestWorkflowExecution, options ...Options) {
	state := &logPrintState{
		currentRef: "",
		stepIndex:  -1,
		prefix:     extractPrefix(options),
		results:    extractResults(execution),
	}

	// Handle case where only error message is available
	if shouldPrintOnlyError(state.results, logs) {
		fmt.Printf("\n%s%s\n", state.prefix, ui.Red(state.results[""].ErrorMessage))
		return
	}

	printStatusHeader(-1, len(steps), "Initializing", state.prefix)
	processLogLines(logs, steps, state)
	printRemainingSteps(steps, state)
}

// logPrintState maintains state while processing log lines
type logPrintState struct {
	currentRef string
	stepIndex  int
	prefix     string
	results    map[string]testkube.TestWorkflowStepResult
}

// extractPrefix returns prefix string from options array
func extractPrefix(options []Options) string {
	for _, o := range options {
		if o.Prefix != "" {
			return o.Prefix
		}
	}
	return ""
}

// extractResults converts execution result to step result map
func extractResults(execution testkube.TestWorkflowExecution) map[string]testkube.TestWorkflowStepResult {
	results := make(map[string]testkube.TestWorkflowStepResult)
	if execution.Result != nil {
		if execution.Result.Steps != nil {
			results = execution.Result.Steps
		}
		if execution.Result.Initialization != nil {
			results[""] = *execution.Result.Initialization
		}
	}
	return results
}

// shouldPrintOnlyError returns true if only initialization error exists
func shouldPrintOnlyError(results map[string]testkube.TestWorkflowStepResult, logs []byte) bool {
	return len(results) < 2 && len(logs) == 0 && len(results[""].ErrorMessage) > 0
}

// processLogLines scans logs for step hints and prints formatted output
func processLogLines(logs []byte, steps []testkube.TestWorkflowSignature, state *logPrintState) {
	for len(logs) > 0 {
		line, remaining := extractNextLine(logs)
		logs = remaining

		line = trimTimestamp(line)

		start := instructions.StartHintRe.FindStringSubmatch(line)
		if len(start) == 0 {
			line += "\x07"
			fmt.Println(state.prefix + line)
			continue
		}

		nextRef := start[1]
		advanceToStep(nextRef, steps, state)
	}
}

// extractNextLine splits off first line from byte buffer
func extractNextLine(logs []byte) (string, []byte) {
	newLineIndex := bytes.Index(logs, NL)
	if newLineIndex == -1 {
		return string(logs), nil
	}
	return string(logs[:newLineIndex]), logs[newLineIndex+len(NL):]
}

// advanceToStep prints results for steps until reaching target reference
func advanceToStep(targetRef string, steps []testkube.TestWorkflowSignature, state *logPrintState) {
	for state.stepIndex == -1 || (state.stepIndex < len(steps) && steps[state.stepIndex].Ref != targetRef) {
		printStepResultIfExists(steps, state)

		state.stepIndex++
		if state.stepIndex < len(steps) {
			state.currentRef = steps[state.stepIndex].Ref
			printStatusHeader(state.stepIndex, len(steps), steps[state.stepIndex].Label(), state.prefix)
		}
	}
}

// printStepResultIfExists prints step result if available in results map
func printStepResultIfExists(steps []testkube.TestWorkflowSignature, state *logPrintState) {
	if ps, ok := state.results[state.currentRef]; ok && ps.Status != nil {
		took := ps.FinishedAt.Sub(ps.QueuedAt).Round(time.Millisecond)
		if state.stepIndex != -1 && state.stepIndex < len(steps) {
			printStatus(steps[state.stepIndex], *ps.Status, took, state.stepIndex, len(steps),
				steps[state.stepIndex].Label(), ps.ErrorMessage, state.prefix)
		}
	}
}

// printRemainingSteps prints results for steps not seen in logs
func printRemainingSteps(steps []testkube.TestWorkflowSignature, state *logPrintState) {
	if state.stepIndex != -1 && state.stepIndex < len(steps) {
		for _, step := range steps[state.stepIndex:] {
			if ps, ok := state.results[step.Ref]; ok && ps.Status != nil {
				took := ps.FinishedAt.Sub(ps.QueuedAt).Round(time.Millisecond)
				printStatus(step, *ps.Status, took, state.stepIndex, len(steps),
					steps[state.stepIndex].Label(), ps.ErrorMessage, state.prefix)
			}

			state.stepIndex++
			state.currentRef = step.Ref
			if state.stepIndex < len(steps) {
				printStatusHeader(state.stepIndex, len(steps), steps[state.stepIndex].Label(), state.prefix)
			}
		}
	}
}
